﻿using CRL.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
using System.Collections;
using System.Threading.Tasks;
using CRL.Core.Extension;

namespace CRL.EventBus.Queue
{
    class MemoryQueue : AbsQueue
    {
        ConfigBase _queueConfig;
        static event MqEventHandler onData;
        public override MQType MQType => MQType.Memory;
        public MemoryQueue(ConfigBase queueConfig)
        {
            _queueConfig = queueConfig;
        }
        public override void Dispose()
        {
            onData = null;
        }
        public override void PublishList(string routingKey, IEnumerable<object> msg, Action<PublishOption> optFunc = null)
        {
            var opt = new PublishOption();
            optFunc?.Invoke(opt);
            object data = msg;
            if (msg.Count() >= 1)
            {
                data = msg.First();
            }
            onData(this, new MsgPackage2 { RoutingKey = routingKey, _InnerData = data, Time = DateTime.Now.AddMilliseconds(opt.DelayMs), MsgKey = opt.MsgKey });
        }
        public override Task PublishListAsync(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            PublishList(routingKey, msgs, optFunc);
            return Task.CompletedTask;
        }
        public override void RePublish(IData data, object msg)
        {
            onData(this, new MsgPackage2 { RoutingKey = data.RoutingKey, RetryTimes = data.RetryTimes + 1, _InnerData = msg, Priority = data.Priority, MsgKey = data.MsgKey });
        }

        public override void Subscribe(EventDeclare eventDeclare)
        {
            var ed = eventDeclare;
            onData += (s, e) =>
            {
                SubscribeData(eventDeclare, e).Wait();
            };
        }
        public override void SubscribeAsync(EventDeclare eventDeclare)
        {
            onData += async (s, e) =>
            {
                await SubscribeData(eventDeclare, e);
            };
        }
        async Task<bool> SubscribeData(EventDeclare ed, MsgPackage2 data)
        {
            if (ed.Name != data.RoutingKey)
            {
                return false;
            }
            object result = null;
            if (ed.EventDataType != data._InnerData.GetType())
            {
                throw new Exception($"发布和订阅类型冲突 subName:{ed.Name} {ed.EventDataType}=>{data._InnerData.GetType()}");
            }
            var instance = ed.CreateServiceInstance();
            try
            {
                //result = ed.MethodInvoke.Invoke(ed.CreateServiceInstance(), new object[] { data._InnerData });
                result = await getInvokeResult(ed, instance, data._InnerData);
            }
            catch (Exception ex)
            {
                result = false;
                Log(ed, ex.ToString());
            }
            var invokeSuccess = checkInvokeSuccess(result);
            CheckResult(invokeSuccess, data, ed, data._InnerData);
            return true;
        }

        public override long CleanQueue(string routingKey)
        {
            //queueDatas.RemoveAll(b => b.RoutingKey == routingKey);
            return 0;
        }
        public override long GetQueueLength(string routingKey)
        {
            return 0;
            //return queueDatas.Count(b => b.RoutingKey == routingKey);
        }
    }
    class MsgPackage2 : IData
    {
        public string Id { get; set; } = Guid.NewGuid().ToString("N");
        public DateTime Time
        {
            get; set;
        } = DateTime.Now;
        public string RoutingKey { get; set; }
        public string MsgKey { get; set; }
        public object _InnerData { get; set; }
        public int RetryTimes { get; set; }
        public int DelayMs { get; set; }
        public int Priority { get; set; }
    }
//    class MemoryPublisher : AbsPublisher
//    {
//#if NETSTANDARD 
//        public MemoryPublisher(Microsoft.Extensions.Options.IOptions<QueueConfig> options) : base(options.Value)
//        {
//            queue = QueueFactory.CreateClient(options.Value.GetMqSetting(MQType.Memory), false);
//            //queueConfig = options.Value;
//        }
//#endif
//        public MemoryPublisher(QueueConfig _queueConfig) : base(_queueConfig)
//        {
//            queue = QueueFactory.CreateClient(_queueConfig.GetMqSetting(MQType.Memory), false);
//        }
//    }

    delegate void MqEventHandler(object sender, MsgPackage2 e);
}
